/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.search;

import static org.apache.solr.core.SolrCore.verbose;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.Constants;
import org.apache.solr.common.util.Utils;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.TestHarness;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

@LuceneTestCase.AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028") // 6-Sep-2018
// can fail due to NPE uncaught exception in stress thread, probably because of null core
public class TestStressRecovery extends TestRTGBase {

  @Before
  public void beforeClass() throws Exception {
    initCore("solrconfig-tlog.xml", "schema15.xml");
  }

  @After
  public void afterClass() {
    deleteCore();
  }

  // This points to the live model when state is ACTIVE, but a snapshot of the
  // past when recovering.
  volatile ConcurrentHashMap<Integer, DocInfo> visibleModel;

  // This version simulates updates coming from the leader and sometimes being reordered
  // and tests the ability to buffer updates and apply them later
  @Test
  public void testStressRecovery() throws Exception {
    assumeFalse("FIXME: This test is horribly slow sometimes on Windows!", Constants.WINDOWS);

    final int commitPercent = 5 + random().nextInt(10);
    final int softCommitPercent = 30 + random().nextInt(75); // what percent of the commits are soft
    final int deletePercent = 4 + random().nextInt(25);
    final int deleteByQueryPercent = random().nextInt(5);
    final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
    int nWriteThreads =
        2 + random().nextInt(10); // fewer write threads to give recovery thread more of a chance

    final int maxConcurrentCommits = nWriteThreads;

    // query variables
    final int percentRealtimeQuery = 75;
    final int percentGetLatestVersions = random().nextInt(4);
    final AtomicLong operations =
        new AtomicLong(atLeast(35)); // number of recovery loops to perform
    int nReadThreads =
        2 + random().nextInt(10); // fewer read threads to give writers more of a chance

    initModel(ndocs);

    final AtomicInteger numCommitting = new AtomicInteger();

    List<Thread> threads = new ArrayList<>();

    final AtomicLong testVersion = new AtomicLong(0);

    final UpdateHandler uHandler = h.getCore().getUpdateHandler();
    final UpdateLog uLog = uHandler.getUpdateLog();
    final Object stateChangeLock = new Object();
    this.visibleModel = model;
    final Semaphore[] writePermissions = new Semaphore[nWriteThreads];
    for (int i = 0; i < nWriteThreads; i++)
      writePermissions[i] = new Semaphore(Integer.MAX_VALUE, false);

    final Semaphore readPermission = new Semaphore(Integer.MAX_VALUE, false);

    for (int i = 0; i < nWriteThreads; i++) {
      final int threadNum = i;

      Thread thread =
          new Thread("WRITER" + i) {
            Random rand = new Random(random().nextInt());
            Semaphore writePermission = writePermissions[threadNum];

            @Override
            public void run() {
              try {
                while (operations.get() > 0) {
                  writePermission.acquire();

                  int oper = rand.nextInt(10);

                  if (oper < commitPercent) {
                    if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
                      Map<Integer, DocInfo> newCommittedModel;
                      long version;

                      synchronized (globalLock) {
                        newCommittedModel = new HashMap<>(model); // take a snapshot
                        version = snapshotCount++;
                      }

                      synchronized (stateChangeLock) {
                        // These commits won't take effect if we are in recovery mode,
                        // so change the version to -1, so we won't update our model.
                        if (uLog.getState() != UpdateLog.State.ACTIVE) version = -1;
                        if (rand.nextInt(100) < softCommitPercent) {
                          verbose("softCommit start");
                          assertU(TestHarness.commit("softCommit", "true"));
                          verbose("softCommit end");
                        } else {
                          verbose("hardCommit start");
                          assertU(commit());
                          verbose("hardCommit end");
                        }
                      }

                      synchronized (globalLock) {
                        // install this model snapshot only if it's newer than the current one
                        // install this model only if we are not in recovery mode.
                        if (version >= committedModelClock) {
                          if (VERBOSE) {
                            verbose("installing new committedModel version=" + committedModelClock);
                          }
                          committedModel = newCommittedModel;
                          committedModelClock = version;
                        }
                      }
                    }
                    numCommitting.decrementAndGet();
                    continue;
                  }

                  int id;

                  if (rand.nextBoolean()) {
                    id = rand.nextInt(ndocs);
                  } else {
                    id = lastId; // reuse the last ID half of the time to force more race conditions
                  }

                  // set the lastId before we actually change it sometimes to try and
                  // uncover more race conditions between writing and reading
                  boolean before = rand.nextBoolean();
                  if (before) {
                    lastId = id;
                  }

                  DocInfo info = model.get(id);

                  long val = info.val;
                  long nextVal = Math.abs(val) + 1;

                  // the version we set on the update should determine who wins
                  // These versions are not derived from the actual leader update handler hand hence
                  // this test may need to change depending on how we handle version numbers.
                  long version = testVersion.incrementAndGet();

                  // yield after getting the next version to increase the odds of updates happening
                  // out of order
                  if (rand.nextBoolean()) Thread.yield();

                  if (oper < commitPercent + deletePercent) {
                    verbose("deleting id", id, "val=", nextVal, "version", version);

                    Long returnedVersion =
                        deleteAndGetVersion(
                            Integer.toString(id),
                            params(
                                "_version_",
                                Long.toString(-version),
                                DISTRIB_UPDATE_PARAM,
                                FROM_LEADER));

                    // TODO: returning versions for these types of updates is redundant
                    // but if we do return, they had better be equal
                    if (returnedVersion != null) {
                      assertEquals(-version, returnedVersion.longValue());
                    }

                    // only update model if the version is newer
                    synchronized (model) {
                      DocInfo currInfo = model.get(id);
                      if (Math.abs(version) > Math.abs(currInfo.version)) {
                        model.put(id, new DocInfo(version, -nextVal));
                      }
                    }

                    verbose("deleting id", id, "val=", nextVal, "version", version, "DONE");
                  } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {

                    verbose("deleteByQuery id", id, "val=", nextVal, "version", version);

                    Long returnedVersion =
                        deleteByQueryAndGetVersion(
                            "id:" + Integer.toString(id),
                            params(
                                "_version_",
                                Long.toString(-version),
                                DISTRIB_UPDATE_PARAM,
                                FROM_LEADER));

                    // TODO: returning versions for these types of updates is redundant
                    // but if we do return, they had better be equal
                    if (returnedVersion != null) {
                      assertEquals(-version, returnedVersion.longValue());
                    }

                    // only update model if the version is newer
                    synchronized (model) {
                      DocInfo currInfo = model.get(id);
                      if (Math.abs(version) > Math.abs(currInfo.version)) {
                        model.put(id, new DocInfo(version, -nextVal));
                      }
                    }

                    verbose("deleteByQuery id", id, "val=", nextVal, "version", version, "DONE");

                  } else {
                    verbose("adding id", id, "val=", nextVal, "version", version);

                    Long returnedVersion =
                        addAndGetVersion(
                            sdoc(
                                "id",
                                Integer.toString(id),
                                FIELD,
                                Long.toString(nextVal),
                                "_version_",
                                Long.toString(version)),
                            params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
                    if (returnedVersion != null) {
                      assertEquals(version, returnedVersion.longValue());
                    }

                    // only update model if the version is newer
                    synchronized (model) {
                      DocInfo currInfo = model.get(id);
                      if (version > currInfo.version) {
                        model.put(id, new DocInfo(version, nextVal));
                      }
                    }

                    if (VERBOSE) {
                      verbose("adding id", id, "val=", nextVal, "version", version, "DONE");
                    }
                  }
                  // }   // end sync

                  if (!before) {
                    lastId = id;
                  }
                }
              } catch (Throwable e) {
                operations.set(-1L);
                throw new RuntimeException(e);
              }
            }
          };

      threads.add(thread);
    }

    for (int i = 0; i < nReadThreads; i++) {
      Thread thread =
          new Thread("READER" + i) {
            Random rand = new Random(random().nextInt());

            @Override
            public void run() {
              try {
                while (operations.get() > 0) {
                  // throttle reads (don't completely stop)
                  readPermission.tryAcquire(10, TimeUnit.MILLISECONDS);

                  // bias toward a recently changed doc
                  int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);

                  // when indexing, we update the index, then the model
                  // so when querying, we should first check the model, and then the index

                  boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
                  DocInfo info;

                  if (realTime) {
                    info = visibleModel.get(id);
                  } else {
                    synchronized (globalLock) {
                      info = committedModel.get(id);
                    }
                  }

                  if (VERBOSE) {
                    verbose("querying id", id);
                  }
                  SolrQueryRequest sreq;
                  if (realTime) {
                    sreq = req("wt", "json", "qt", "/get", "ids", Integer.toString(id));
                  } else {
                    sreq =
                        req("wt", "json", "q", "id:" + Integer.toString(id), "omitHeader", "true");
                  }

                  String response = h.query(sreq);
                  @SuppressWarnings({"rawtypes"})
                  Map rsp = (Map) Utils.fromJSONString(response);
                  @SuppressWarnings({"rawtypes"})
                  List doclist = (List) (((Map) rsp.get("response")).get("docs"));
                  if (doclist.size() == 0) {
                    // there's no info we can get back with a delete operation, so not much we can
                    // check
                    // without further synchronization
                  } else {
                    assertEquals(1, doclist.size());
                    long foundVal = (Long) (((Map) doclist.get(0)).get(FIELD));
                    long foundVer = (Long) (((Map) doclist.get(0)).get("_version_"));
                    if (foundVer < Math.abs(info.version)
                        || (foundVer == info.version
                            && foundVal != info.val)) { // if the version matches, the val must
                      verbose("ERROR, id=", id, "found=", response, "model", info);
                      fail();
                    }
                  }
                }

                if (rand.nextInt(100) < percentGetLatestVersions) {
                  getLatestVersions();
                  // TODO: some sort of validation that the latest version is >= to the latest
                  // version we added?
                }

              } catch (Throwable e) {
                operations.set(-1L);
                throw new RuntimeException(e);
              }
            }
          };

      threads.add(thread);
    }

    for (Thread thread : threads) {
      thread.start();
    }

    int bufferedAddsApplied = 0;
    do {
      assertSame(uLog.getState(), UpdateLog.State.ACTIVE);

      // before we start buffering updates, we want to point
      // visibleModel away from the live model.

      visibleModel = new ConcurrentHashMap<>(model);

      synchronized (stateChangeLock) {
        uLog.bufferUpdates();
      }

      assertSame(uLog.getState(), UpdateLog.State.BUFFERING);

      // sometimes wait for a second to allow time for writers to write something
      if (random().nextBoolean()) Thread.sleep(random().nextInt(10) + 1);

      Future<UpdateLog.RecoveryInfo> recoveryInfoF = uLog.applyBufferedUpdates();
      if (recoveryInfoF != null) {
        UpdateLog.RecoveryInfo recInfo = null;

        int writeThreadNumber = 0;
        int cnt = 5000;
        while (recInfo == null) {
          try {
            // wait a short period of time for recovery to complete (and to give a chance for more
            // writers to concurrently add docs)
            cnt--;
            recInfo =
                recoveryInfoF.get(random().nextInt(100 / nWriteThreads), TimeUnit.MILLISECONDS);
          } catch (TimeoutException e) {
            // idle one more write thread
            verbose(
                "Operation",
                operations.get(),
                "Draining permits for write thread",
                writeThreadNumber);
            writePermissions[writeThreadNumber++].drainPermits();
            if (writeThreadNumber >= nWriteThreads) {
              // if we hit the end, back up and give a few write permits
              writeThreadNumber--;
              writePermissions[writeThreadNumber].release(random().nextInt(2) + 1);
            }

            // throttle readers, so they don't steal too much CPU from the recovery thread
            readPermission.drainPermits();
          }
          if (cnt == 0) {
            break;
          }
        }
        if (recInfo != null) {
          bufferedAddsApplied += recInfo.adds;
        }
      }

      // put all writers back at full blast
      for (Semaphore writePerm : writePermissions) {
        // I don't think semaphores check for overflow, so we need to check mow many remain
        int neededPermits = Integer.MAX_VALUE - writePerm.availablePermits();
        if (neededPermits > 0) writePerm.release(neededPermits);
      }

      // put back readers at full blast and point back to live model
      visibleModel = model;
      int neededPermits = Integer.MAX_VALUE - readPermission.availablePermits();
      if (neededPermits > 0) readPermission.release(neededPermits);

      verbose("ROUND=", operations.get());
    } while (operations.decrementAndGet() > 0);

    verbose("bufferedAddsApplied=", bufferedAddsApplied);

    for (Thread thread : threads) {
      thread.join();
    }
  }
}
